Logstash:使用 Ruby 过滤器

您所在的位置:网站首页 logstash 变量 Logstash:使用 Ruby 过滤器

Logstash:使用 Ruby 过滤器

2024-01-12 13:50| 来源: 网络整理| 查看: 265

Logstash 具有丰富的过滤器集,你甚至可以编写自己的过滤器,但是由于没有现成的过滤器,你可以直接将 Ruby 代码 入配置文件中,因此通常不必创建自己的过滤器。

使用 logstash-filter-ruby,你可以使用Ruby字符串操作的所有功能来解析奇异的正则表达式,不完整的日期格式,写入文件,甚至进行 Web 服务调用。

Logstash 安装

如果你从来没有安装过自己的 Logstash,你可以参考我之前的文章 “如何安装 Elastic 栈中的 Logstash” 来进行安装。

例子事件

让我们在整个日志事件示例中使用一个包含3个字段的示例:

没有日期的时间戳 – 02:36.01源日志文件的完整路径 – /var/log/Service1/myapp.log字符串 – “Ruby is great”

该事件如下所示,我们将在接下来的示例中使用它。

02:36.01 /var/log/Service1/myapp.log Ruby is great

我们创建如下的 ruby-logstash.conf 文件:

ruby-logstash.conf

input { generator { message => "02:36.01 /var/log/Service1/myapp.log Ruby is great" count => 1 } } filter { } output { stdout { codec => rubydebug } }

现在运行 Logstash:

./bin/logstash -f ruby-logstash.conf

几秒钟后,它应该显示 “Pipeline main started”。

由于我们未提供任何过滤器,因此它只是将日志事件作为 “message” 字段回显,并添加了有关主机和事件处理时间的一些元数据。现在,让我们做最少的事情,并将这条消息分解为几个字段:

input { generator { message => "02:36.01 /var/log/Service1/myapp.log Ruby is great" count => 1 } } filter { grok { match => { "message" => "%{DATA:justtime} %{DATA:logsource} %{GREEDYDATA:msg}" } } } output { stdout { codec => rubydebug } }

这次我们使用了 grok 过滤器来结构化我们的数据。再次运行 Logstash:

显然这次,我们的数据变得结构化一点。但是就像我们之前提到的那样,这里的 justtime 显示是没有日期的。这个在我们的 ES 里是没法使用。我们在接下来的部分介绍如何使用 ruby 过滤器来对它进行加工。

使用 ruby 来解析不完整的日期

请注意,时间 '02:36.01' 非常稀疏。

它不提供日期,也不提供时区参考。  假设,我们必须假设日期为当天,只有在检查了服务器设置及其构造日志文件后,我们才知道实时时区。

让我们将以下处理添加到 ruby-logstash.conf 文件中的 filter 部分。 首先,我们允许标准的 logstash 'date' 处理解析(这将产生不正确的结果),然后在下面,我们使用 'ruby' 扩展今天的日期,然后解析结果(得出正确的结果)。

ruby-logstash.conf

input { generator { message => "02:36.01 /var/log/Service1/myapp.log Ruby is great" count => 1 } } filter { grok { match => { "message" => "%{DATA:justtime} %{DATA:logsource} %{GREEDYDATA:msg}" } } # incorrectly autopopulates to first day of year date { match => [ "justtime", "HH:mm.ss" ] target => "incorrectfulldatetime" timezone => "America/Los_Angeles" } # date # use ruby to augment with current day ruby { code => " event.set('fulldatetime', Time.now.strftime('%Y-%m-%d') + ' ' + event.get('justtime')) " } date { match => [ "fulldatetime", "YYYY-MM-dd HH:mm.ss" ] target => "correctfulldatetime" # target => "@timestamp" timezone => "America/Los_Angeles" } # date } output { stdout { codec => rubydebug } }

在上面,我们使用了内置的 ruby 代码来实现我们通常的开箱即用的 filter 所实现不了的过滤器。有些时候,甚至为了可维护性,我们可以把 ruby 的代码放入到一个文件中,比如:

filter { ruby { # Cancel 90% of events path => "/etc/logstash/drop_percentage.rb" script_params => { "percentage" => 0.9 } } }

重新运行  Logstash:

请注意,由于未提供日期部分,因此为一年中的第一天设置的“ invalidfulldatetime” 是2020年1月1日。 并注意“ correctfulldatetime” 如何使用 2020-08-12(当天)。

我们在上面创建了一个名为 “correctfulldatetime” 的新字段,但是,如果你要更新标准 timestamp 字段,则只需把 target 设置为 “@timestamp” 即可。

Ruby 解析更为复杂的句子

源日志文件名 “/var/log/Service1/myapp.log” 包含我们要提取的嵌入数据。 父目录 “Service1” 目录指示服务的名称,它可以嵌套在任意深处,并且也需要小写。

虽然这肯定是我们可以使用标准 logstash 正则表达式提取值的情况,但让我们使用 Ruby,因为你可能会遇到这样的情况:标准正则表达式无法覆盖你的需求,或者该表达式过于复杂以至于你需要维护 将问题分解为易于理解的步骤。

因此,我们将以下内容添加到 ruby-logstash.conf 过滤器中:

ruby-logstash.conf

input { generator { message => "02:36.01 /var/log/Service1/myapp.log Ruby is great" count => 1 } } filter { grok { match => { "message" => "%{DATA:justtime} %{DATA:logsource} %{GREEDYDATA:msg}" } } # incorrectly autopopulates to first day of year date { match => [ "justtime", "HH:mm.ss" ] target => "incorrectfulldatetime" timezone => "America/Los_Angeles" } # date # use ruby to augment with current day ruby { code => " event.set('fulldatetime', Time.now.strftime('%Y-%m-%d') + ' ' + event.get('justtime')) " } date { match => [ "fulldatetime", "YYYY-MM-dd HH:mm.ss" ] # target => "correctfulldatetime" target => "@timestamp" timezone => "America/Los_Angeles" } # date # split apart log source to extract service name ruby { code => " fpath = event.get('logsource').split('/') event.set('serviceName', fpath[fpath.length-2].downcase) " } } output { stdout { codec => rubydebug } }

重新运行 Logstash:

在上面,我们可以看到一个新的字段 servideName 被创建了。

Ruby 写入一个本地文件

无论是用于调试,指标还是检测,让 Logstash 作为服务运行将字段值写到本地磁盘都是很有用的。 如果你绝对需要原子性,请使用文件锁。在上面的 ruby-logstash.conf 中的 filer 部分添加:

# append msg field to disk ruby { code => " File.open('/tmp/mydebug.log','a') { |f| f.puts event.get('msg') } " }

重新运行 Logstash。我们可以看到如下生产的文件:

$ ls /tmp/mydebug.log /tmp/mydebug.log

Ruby 调用微服务

如果你正在进行高级集成,则可能需要显示消息 “Ruby is great”,并根据该信息进行某种 REST/Web/network 查找。

为了简单起见,让我们以消息 “Ruby” 的第一个单词为例,并调用一个基于 REST 的 echo 服务,该服务以json格式返回结果。 这是你要添加到过滤器中的代码:

# call out to REST based echo service ruby { init => " require 'net/http' require 'json' " code => " firstWord = event.get('msg').split(' ')[0] uri = URI.parse('http://echo.jsontest.com/word/' + firstWord) response = Net::HTTP.get_response(uri) if response.code == '200' result = JSON.parse(response.body) returnWord = result['word'] event.set('echo', firstWord + ' echoed back as: ' + returnWord) else event.set('echo', 'ERROR reaching web service') end " }

重新运行 Logstash:

在上面,我们可以看到新增加的 echo 字段。

整个最终的 ruby-logstash.conf 的代码如下:

ruby-logstash.conf

input { generator { message => "02:36.01 /var/log/Service1/myapp.log Ruby is great" count => 1 } } filter { grok { match => { "message" => "%{DATA:justtime} %{DATA:logsource} %{GREEDYDATA:msg}" } } # incorrectly autopopulates to first day of year date { match => [ "justtime", "HH:mm.ss" ] target => "incorrectfulldatetime" timezone => "America/Los_Angeles" } # date # use ruby to augment with current day ruby { code => " event.set('fulldatetime', Time.now.strftime('%Y-%m-%d') + ' ' + event.get('justtime')) " } date { match => [ "fulldatetime", "YYYY-MM-dd HH:mm.ss" ] # target => "correctfulldatetime" target => "@timestamp" timezone => "America/Los_Angeles" } # date # split apart log source to extract service name ruby { code => " fpath = event.get('logsource').split('/') event.set('serviceName', fpath[fpath.length-2].downcase) " } # append msg field to disk ruby { code => " File.open('/tmp/mydebug.log','a') { |f| f.puts event.get('msg') } " } # call out to REST based echo service ruby { init => " require 'net/http' require 'json' " code => " firstWord = event.get('msg').split(' ')[0] uri = URI.parse('http://echo.jsontest.com/word/' + firstWord) response = Net::HTTP.get_response(uri) if response.code == '200' result = JSON.parse(response.body) returnWord = result['word'] event.set('echo', firstWord + ' echoed back as: ' + returnWord) else event.set('echo', 'ERROR reaching web service') end " } } output { stdout { codec => rubydebug } }



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3